package com.zhang.gmall.realtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.zhang.gmall.realtime.app.function.DimSinkFunction;
import com.zhang.gmall.realtime.app.function.FlinkCDC_WithDeserialization;
import com.zhang.gmall.realtime.app.function.TableProcessFunction;
import com.zhang.gmall.realtime.bean.TableProcess;
import com.zhang.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;

/**
 * @title:
 * @author: zhang
 * @date: 2022/1/1 20:02
 */

//数据流：web/app -> nginx -> SpringBoot -> Mysql -> FlinkApp -> Kafka(ods) -> FlinkApp -> Kafka(dwd)/Phoenix(dim)
//程  序：           mockDb -> Mysql -> FlinkCDC -> Kafka(ZK) -> BaseDBApp -> Kafka/Phoenix(hbase,zk,hdfs)
public class BaseDBApp {
    public static void main(String[] args) throws Exception {
        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 开启CK并指定状态后端为FS    memory  fs  rocksdb
      /*  env.setStateBackend(new FsStateBackend("hdfs://hadoop302:8020/gmall-flink/ck"));
        env.enableCheckpointing(5000L);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(10000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
        System.setProperty("HADOOP_USER_NAME", "zhang");*/


        //TODO 2.消费Kafka ods_base_db 主题数据创建流
        String topic = "ods_base_db";
        String groupId = "base_db_app";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(topic, groupId));

        //value:{"db":"","tn":"","before":{},"after":{},"type":""}
        //TODO 3.将每行数据转换为JSON对象并过滤（delete）主流
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(line -> JSON.parseObject(line))
                .filter(new FilterFunction<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject jsonObject) throws Exception {
                        return !"delete".equals(jsonObject.getString("type"));
                    }
                });

        //TODO 4.使用FlinkCDC消费配置表并处理成         广播流
        DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("hadoop302")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("gmall_realtime")
                .tableList("gmall_realtime.table_process")
                .startupOptions(StartupOptions.initial())
                .deserializer(new FlinkCDC_WithDeserialization())
                .build();
        DataStreamSource<String> tableProcessStrDS = env.addSource(sourceFunction);
        MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
        BroadcastStream<String> broadcastStream = tableProcessStrDS.broadcast(mapStateDescriptor);
        //TODO 5.连接主流和广播流
        BroadcastConnectedStream<JSONObject,String> connectedStream = jsonObjDS.connect(broadcastStream);
        //TODO 6.分流  处理数据   广播流数据，主流数据（根据广播流进行处理）
        OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>("hbase_tag"){};
        SingleOutputStreamOperator<JSONObject> kafka = connectedStream.process(new TableProcessFunction(hbaseTag, mapStateDescriptor));
        //TODO 7.提取Kafka流数据和HBase流数据
        DataStream<JSONObject> hbase = kafka.getSideOutput(hbaseTag);
        //TODO 8.将Kafka数据写入Kafka主题，将HBase数据写入Phoneix表
        hbase.print("HBase>>>>>>>>");
        kafka.print("Kafka>>>>>>>>");
        hbase.addSink(new DimSinkFunction());
        kafka.addSink(MyKafkaUtil.getKafkaProducer(new KafkaSerializationSchema<JSONObject>() {
            @Override
            public ProducerRecord<byte[], byte[]> serialize(JSONObject value, @Nullable Long aLong) {
                return new ProducerRecord<byte[], byte[]>(value.getString("sinkTable"),value.getString("after").getBytes(StandardCharsets.UTF_8));
            }
        }));
        //TODO 9.启动任务
        env.execute();

    }
}
